//go:build functional

package sarama

import (
	"context"
	"fmt"
	"maps"
	"slices"
	"strconv"
	"testing"
	"time"

	"github.com/davecgh/go-spew/spew"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

func TestFuncAdminQuotas(t *testing.T) {
	const (
		waitFor = 10 * time.Second
		tick    = 100 * time.Millisecond
	)
	checkKafkaVersion(t, "2.6.0.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
	if err != nil {
		t.Fatal(err)
	}

	config := NewFunctionalTestConfig()
	config.Version = kafkaVersion
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	// Check that we can read the quotas, and that they are empty
	quotas, err := adminClient.DescribeClientQuotas(nil, false)
	if err != nil {
		t.Fatal(err)
	}
	if len(quotas) != 0 {
		t.Fatalf("Expected quotas to be empty at start, found: %v", quotas)
	}

	// Put a quota on default user
	// /config/users/<default>
	defaultUser := []QuotaEntityComponent{{
		EntityType: QuotaEntityUser,
		MatchType:  QuotaMatchDefault,
	}}
	produceOp := ClientQuotasOp{
		Key:   "producer_byte_rate",
		Value: 1024000,
	}
	if err = adminClient.AlterClientQuotas(defaultUser, produceOp, false); err != nil {
		t.Fatal(err)
	}

	// Poll until we have the expected quota entry
	defaultUserFilter := QuotaFilterComponent{
		EntityType: QuotaEntityUser,
		MatchType:  QuotaMatchDefault,
	}
	require.EventuallyWithT(t, func(t *assert.CollectT) {
		quotas, err = adminClient.DescribeClientQuotas(
			[]QuotaFilterComponent{defaultUserFilter},
			false,
		)
		require.NoError(t, err)
		require.NotEmpty(t, quotas, "Expected not empty quotas for default user")
		require.Len(t, quotas, 1, "Expected one quota entry for default user")
	}, waitFor, tick, "Quotas state has still not updated for default user")

	// Put a quota on specific client-id for a specific user
	// /config/users/<user>/clients/<client-id>
	specificUserClientID := []QuotaEntityComponent{
		{
			EntityType: QuotaEntityUser,
			MatchType:  QuotaMatchExact,
			Name:       "sarama",
		},
		{
			EntityType: QuotaEntityClientID,
			MatchType:  QuotaMatchExact,
			Name:       "sarama-consumer",
		},
	}
	consumeOp := ClientQuotasOp{
		Key:   "consumer_byte_rate",
		Value: 2048000,
	}
	if err = adminClient.AlterClientQuotas(specificUserClientID, consumeOp, false); err != nil {
		t.Fatal(err)
	}

	// Check that we can query a specific quota entry
	userFilter := QuotaFilterComponent{
		EntityType: QuotaEntityUser,
		MatchType:  QuotaMatchExact,
		Match:      "sarama",
	}
	clientFilter := QuotaFilterComponent{
		EntityType: QuotaEntityClientID,
		MatchType:  QuotaMatchExact,
		Match:      "sarama-consumer",
	}
	require.EventuallyWithT(t, func(t *assert.CollectT) {
		quotas, err = adminClient.DescribeClientQuotas(
			[]QuotaFilterComponent{userFilter, clientFilter},
			true,
		)
		require.NoError(t, err)
		require.NotEmpty(t, quotas, "Expected not empty quotas for specific clientID")
		require.Len(t, quotas, 1, "Expected one quota entry for specific clientID")
		require.InDelta(
			t,
			quotas[0].Values[consumeOp.Key],
			consumeOp.Value,
			0.01,
			"Expected specific quota value to be %f, found: %v",
			consumeOp.Value,
			quotas[0].Values[consumeOp.Key],
		)
	}, waitFor, tick, "Quotas state for specific clientID has still not updated")

	// Remove quota entries
	deleteProduceOp := ClientQuotasOp{
		Key:    produceOp.Key,
		Remove: true,
	}
	if err = adminClient.AlterClientQuotas(defaultUser, deleteProduceOp, false); err != nil {
		t.Fatal(err)
	}

	deleteConsumeOp := ClientQuotasOp{
		Key:    consumeOp.Key,
		Remove: true,
	}
	if err = adminClient.AlterClientQuotas(specificUserClientID, deleteConsumeOp, false); err != nil {
		t.Fatal(err)
	}
}

func TestFuncAdminDescribeGroups(t *testing.T) {
	checkKafkaVersion(t, "2.3.0.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	group1 := testFuncConsumerGroupID(t)
	group2 := testFuncConsumerGroupID(t)

	kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
	if err != nil {
		t.Fatal(err)
	}

	config := NewFunctionalTestConfig()
	config.Version = kafkaVersion
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	config1 := NewFunctionalTestConfig()
	config1.ClientID = "M1"
	config1.Version = V2_3_0_0
	config1.Consumer.Offsets.Initial = OffsetNewest
	m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
	defer m1.Close()

	config2 := NewFunctionalTestConfig()
	config2.ClientID = "M2"
	config2.Version = V2_3_0_0
	config2.Consumer.Offsets.Initial = OffsetNewest
	config2.Consumer.Group.InstanceId = "Instance2"
	m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
	defer m2.Close()

	m1.WaitForState(2)
	m2.WaitForState(2)

	res, err := adminClient.DescribeConsumerGroups([]string{group1, group2})
	if err != nil {
		t.Fatal(err)
	}
	if len(res) != 2 {
		t.Errorf("group description should be 2, got %v\n", len(res))
	}
	if len(res[0].Members) != 1 {
		t.Errorf("should have 1 members in group , got %v\n", len(res[0].Members))
	}
	if len(res[1].Members) != 1 {
		t.Errorf("should have 1 members in group , got %v\n", len(res[1].Members))
	}

	m1.AssertCleanShutdown()
	m2.AssertCleanShutdown()
}

func TestFuncAdminListConsumerGroups(t *testing.T) {
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	group1 := testFuncConsumerGroupID(t)
	group2 := testFuncConsumerGroupID(t)

	config := NewFunctionalTestConfig()
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	config1 := NewFunctionalTestConfig()
	config1.ClientID = "M1"
	config1.Consumer.Offsets.Initial = OffsetNewest
	m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, group1, 100, nil, "test.4")
	defer m1.Close()

	config2 := NewFunctionalTestConfig()
	config2.ClientID = "M2"
	config2.Consumer.Offsets.Initial = OffsetNewest
	config2.Consumer.Group.InstanceId = "Instance2"
	m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, group2, 100, nil, "test.4")
	defer m2.Close()

	m1.WaitForState(2)
	m2.WaitForState(2)

	res, err := adminClient.ListConsumerGroups()
	if err != nil {
		t.Fatal(err)
	}
	assert.GreaterOrEqual(t, len(res), 2)
	assert.Contains(t, slices.Collect(maps.Keys(res)), group1)
	assert.Contains(t, slices.Collect(maps.Keys(res)), group2)

	m1.AssertCleanShutdown()
	m2.AssertCleanShutdown()
}

func TestFuncAdminListConsumerGroupOffsets(t *testing.T) {
	checkKafkaVersion(t, "0.8.2.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	config := NewFunctionalTestConfig()
	config.ClientID = t.Name()
	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
	defer safeClose(t, client)
	if err != nil {
		t.Fatal(err)
	}

	group := testFuncConsumerGroupID(t)
	consumerGroup, err := NewConsumerGroupFromClient(group, client)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, consumerGroup)

	offsetMgr, _ := NewOffsetManagerFromClient(group, client)
	defer safeClose(t, offsetMgr)
	markOffset(t, offsetMgr, "test.4", 0, 2)
	offsetMgr.Commit()

	coordinator, err := client.Coordinator(group)
	if err != nil {
		t.Fatal(err)
	}

	t.Logf("coordinator broker %d", coordinator.id)

	adminClient, err := NewClusterAdminFromClient(client)
	if err != nil {
		t.Fatal(err)
	}
	{
		resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}})
		if err != nil {
			t.Fatal(err)
		}
		t.Log(spew.Sdump(resp))
	}

	brokerID := coordinator.id
	if err := stopDockerTestBroker(context.Background(), brokerID); err != nil {
		t.Fatal(err)
	}

	t.Cleanup(
		func() {
			if err := startDockerTestBroker(context.Background(), brokerID); err != nil {
				t.Fatal(err)
			}
		},
	)

	{
		resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}})
		if err != nil {
			t.Fatal(err)
		}
		t.Log(spew.Sdump(resp))
	}

	coordinator, err = adminClient.Coordinator(group)
	if err != nil {
		t.Fatal(err)
	}

	t.Logf("coordinator broker %d", coordinator.id)
}

func TestFuncAdminDescribeLogDirs(t *testing.T) {
	checkKafkaVersion(t, "2.0.0.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
	if err != nil {
		t.Fatal(err)
	}

	config := NewFunctionalTestConfig()
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	brokerIDs := make([]int32, len(FunctionalTestEnv.KafkaBrokerAddrs))
	for i := range brokerIDs {
		brokerIDs[i] = int32(i + 1)
	}

	res, err := adminClient.DescribeLogDirs(brokerIDs)
	if err != nil {
		t.Fatal(err)
	}
	if len(res) != len(FunctionalTestEnv.KafkaBrokerAddrs) {
		t.Errorf("should have %d broker replies, got %v\n", len(FunctionalTestEnv.KafkaBrokerAddrs), len(res))
	}

	for _, resp := range res {
		for _, logDir := range resp {
			assert.Equal(t, logDir.ErrorCode, ErrNoError)
			// assert that total bytes and usable bytes were returned for kafka 3.3 and newer
			if kafkaVersion.IsAtLeast(V3_3_0_0) {
				assert.NotZero(t, logDir.TotalBytes)
				assert.NotZero(t, logDir.UsableBytes)
			}
		}
	}
}

func TestFuncAdminDeleteGroup(t *testing.T) {
	checkKafkaVersion(t, "2.4.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)
	config := NewFunctionalTestConfig()
	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
	defer safeClose(t, client)
	if err != nil {
		t.Fatal(err)
	}

	// create a consumer group
	groupID := testFuncConsumerGroupID(t)
	consumerGroup, err := NewConsumerGroupFromClient(groupID, client)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, consumerGroup)

	offsetMgr, _ := NewOffsetManagerFromClient(groupID, client)
	defer safeClose(t, offsetMgr)
	markOffset(t, offsetMgr, "test.1", 0, 1)
	offsetMgr.Commit()

	admin, err := NewClusterAdminFromClient(client)
	if err != nil {
		t.Fatal(err)
	}
	groups, err := admin.ListConsumerGroups()
	if err != nil {
		t.Fatal(err)
	}
	if _, ok := groups[groupID]; !ok {
		t.Fatalf("Expected test group, %s, not found.", groupID)
	}

	err = admin.DeleteConsumerGroup(groupID)
	if err != nil {
		t.Fatal(err)
	}

	groups, err = admin.ListConsumerGroups()
	if err != nil {
		t.Fatal(err)
	}
	if _, ok := groups[groupID]; ok {
		t.Fatalf("Expected test group, %s, found after delete.", groupID)
	}
}

func TestFuncAdminDeleteTopic(t *testing.T) {
	checkKafkaVersion(t, "0.10.0.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	config := NewFunctionalTestConfig()
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	err = adminClient.CreateTopic("delete_topic_test", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false)
	if err != nil {
		t.Fatal(err)
	}
	err = adminClient.DeleteTopic("delete_topic_test")
	if err != nil {
		t.Fatal(err)
	}
}

func TestFuncAdminIncrementalAlterConfigs(t *testing.T) {
	checkKafkaVersion(t, "2.3.0.0")
	setupFunctionalTest(t)
	defer teardownFunctionalTest(t)

	config := NewFunctionalTestConfig()
	adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
	if err != nil {
		t.Fatal(err)
	}
	defer safeClose(t, adminClient)

	brokerIDs := make([]int32, len(FunctionalTestEnv.KafkaBrokerAddrs))
	for i := range brokerIDs {
		brokerIDs[i] = int32(i + 1)
	}

	getConfigValue := func(config string) int {
		resource := ConfigResource{
			Type:        BrokerResource,
			Name:        "1",
			ConfigNames: []string{config},
		}
		res, err := adminClient.DescribeConfig(resource)
		if err != nil {
			t.Fatal(err)
		}
		if len(res) != 1 {
			t.Fatalf("expected 1 config in response but got %d", len(res))
		}
		if res[0].Name != config {
			t.Fatalf("expected config in response name to be '%s' but got '%s'", config, res[0].Name)
		}
		n, err := strconv.Atoi(res[0].Value)
		if err != nil {
			t.Fatalf("failed to parse config in response value '%s': %v", res[0].Value, err)
		}
		return n
	}
	configName := "log.cleaner.backoff.ms"
	n := getConfigValue(configName)
	n++
	value := fmt.Sprintf("%d", n)
	err = adminClient.IncrementalAlterConfig(BrokerResource, "1",
		map[string]IncrementalAlterConfigsEntry{
			configName: {
				Operation: IncrementalAlterConfigsOperationSet,
				Value:     &value,
			},
		}, false)
	if err != nil {
		t.Fatalf("failed to alter config: %v", err)
	}
}
